采用MQTT方式发布图像数据

技术经验 dingxiao 阅读数:2011 2018年7月16日 15:36

采用MQTT方式发布图像数据

1.图像数据(jpg)转网络IOT实时显示

参考文档地址:[VideoAndMQTT](https://github.com/titos-carrasco/VideoAndMQTT)

该项目采用python实现,其中参考了html实现的方法,代码为:

  <!DOCTYPE html PUBLIC '-//W3C//DTD XHTML 1.0 Strict//EN' 'http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd'>
<html xmlns='http://www.w3.org/1999/xhtml' xml:lang='es'>
<head>
<title>Video and WS/MQTT (paho lib)</title>
<meta http-equiv='content-type' content='text/html;charset=utf-8' />
<script src='https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js' type='text/javascript'></script>
<script type='text/javascript'>
// put here the mqtt connections parameters
var streams = [
   [ "xx.xx.xx.xx", xxxx, "clientId", "pic", "rcr_" + createUUID() ],
   // [ "broker.hivemq.com", 8000, "/mqtt", "demos/rcr/video2", "rcr_" + createUUID() ],
   // [ "broker.hivemq.com", 8000, "/mqtt", "demos/rcr/video3", "rcr_" + createUUID() ],
];

function createUUID() {
   var s = [];
   var hexDigits = "0123456789abcdef";
   for (var i = 0; i < 36; i++) {
       s[i] = hexDigits.substr(Math.floor(Math.random() * 0x10), 1);
   }
   s[14] = "4";  // bits 12-15 of the time_hi_and_version field to 0010
   s[19] = hexDigits.substr((s[19] & 0x3) | 0x8, 1);  // bits 6-7 of the clock_seq_hi_and_reserved to 01
   s[8] = s[13] = s[18] = s[23] = "-";

   var uuid = s.join("");
   return uuid;
}

function Connect()
{
   for( i=0; i<streams.length; i++ ){
       img = document.createElement( 'img' );
       img.width = "320";
       img.height = "240";
       img.src='';
       img.id = 'image_' + i;
       container = document.getElementById( 'container' );
       container.appendChild( img );

       stream = streams[i]
       // client = new Paho.MQTT.Client( stream[0],stream[1], stream[2], stream[4] );
       client = new Paho.MQTT.Client( stream[0],stream[1], stream[2]);
       client.idx = i

       client.onMessageArrived = function ( message ){
           data = message.payloadbytes;
           img = document.getelementbyid( 'image_' + this.idx )
           img.src = 'data:image/png;base64,' +  btoa( string.fromcharcode.apply( null, data ) );
       }.bind( client );

       function onConnect(){
           stream = streams[ this.idx ]
           this.subscribe( stream[3] );
       }

       client.connect( { onSuccess: onConnect.bind( client ) } );
   }
}
</script>
</head>
<body onload='Connect();'>
<h1>Video and WS/MQTT (paho lib)</h1>
<div id='container'></div>
</body>
</html>

其中关键代码为:

  client.onMessageArrived = function ( message ){
data = message.payloadbytes;
img = document.getelementbyid( 'image_' + this.idx )
img.src = 'data:image/png;base64,' +  btoa( string.fromcharcode.apply( null, data ) );
}.bind( client );

代码实现了html端接收MQTT发送的图片数据,并解码的过程。

2.MQTT发送端初始化操作

发送端使用Java实现 ,发送端MQTT初始化代码为:

  private void Mqtt_Init()
{
MemoryPersistence persistence = new MemoryPersistence();

try {
       sampleClient = new MqttAsyncClient(broker, "dx", persistence);
       MqttConnectOptions connOpts = new MqttConnectOptions();
       connOpts.setCleanSession(true);
       sampleClient.setCallback(null);
       System.out.println("Connecting to broker: " + broker);
       sampleClient.connect(connOpts);
       System.out.println("Connected");
       Thread.sleep(1000);
       sampleClient.subscribe("dx", 0);
       System.out.println("Subscribed");

   } catch (Exception me) {
       if (me instanceof MqttException) {
           System.out.println("reason " + ((MqttException) me).getReasonCode());
       }
       System.out.println("msg " + me.getMessage());
       System.out.println("loc " + me.getLocalizedMessage());
       System.out.println("cause " + me.getCause());
       System.out.println("excep " + me);
       me.printStackTrace();
   }
}

上述配置中,连接节点必须配置callback。

3.MQTT图像数据转码及发布操作

图片转码参考网址:Convert Image to Base64 String or Base64 String to Image in Java.参考代码:

  package com;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;

import org.apache.commons.codec.binary.Base64;

public class ImageToBase64 {
public static void main(String...s) throws Exception{
 
 //encode image to Base64 String
 File f = new File("H:/encode/sourceImage.png");  //change path of image according to you
 FileInputStream fis = new FileInputStream(f);
 byte byteArray[] = new byte[(int)f.length()];
 fis.read(byteArray);
 String imageString = Base64.encodeBase64String(byteArray);
 
 //decode Base64 String to image
 FileOutputStream fos = new FileOutputStream("H:/decode/destinationImage.png"); //change path of image according to you
 byteArray = Base64.decodeBase64(imageString);
 fos.write(byteArray);

 fis.close();
 fos.close();
}
}

发送端对采集到的图片数据进行转码及MQTT发布操作代码为:

  private void Mqtt_Send_Pic()
{
MainController_opencv.Mqtt_Lock = true;
  try {

      try {
  ImageIO.write(ImageIO.read(new File("/home/pi/mqtt.png")), "JPEG", new File("/home/pi/mqtt.jpg"));
 } catch (IOException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 }

 //encode image to Base64 String
 File f = new File("/home/pi/mqtt.jpg");  //change path of image according to you
 FileInputStream fis = new FileInputStream(f);
 byte byteArray[] = new byte[(int)f.length()];
 fis.read(byteArray);
 //String imageString = Base64.encodeBase64String(byteArray);
 sampleClient.publish("pic", byteArray, 0, false);
 fis.close();

} catch (Exception e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
}
   MainController_opencv.Mqtt_Lock = false;
}

图片数据由于是采用opencv方式采集,原始数据最先处理成png图片格式,所以需要转码成jpg图片格式才能满足发布要求,故需先转码。发布时图片数据被处理成字节流方式进行发布。

4.MQTT自动断连检测

在使用MQTT进行图片数据发布时,由于网络环境及小型单板计算机能力的局限性,造成MQTT发布时遇到断连问题,在实际测试中也经常出现,断连后,MQTT本身自属的机制可以自动检测到,并可以实施断连后的自动连接。

在MQTT发布接口处增加异常捕捉:

  sampleClient.publish("pic", byteArray, 0, false);

捕捉代码为:

catch (Exception e)
{
    // TODO Auto-generated catch block
    //e.printStackTrace();
    System.out.println("send error");
    MQTT_ConectLost_Flag = false;
    //@-mqtt接口初始化
    Mqtt_Init();
}


captcha
    暂无评论